package com.add

import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}

object MyMapAccumulator {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("MyAcc").setMaster("local[4]")
    val sc: SparkContext = new SparkContext(conf)
    val list1 = List(30, 50, 70, 60, 10, 20, 10, 30, 40, 50)
    val rdd1: RDD[Int] = sc.parallelize(list1, 2)

    val acc = new MyMapAccumulator
    sc.register(acc)
    rdd1.foreach(x => acc.add(x))

    println(acc.value)  //输出: Map(sum -> 370.0, count -> 10, avg -> 37.0)

    sc.stop()

  }
}

// 将来累加器的值同时包含 sum, count, avg
// (sum, count, avg)
// Map("sum"-> 1000, "count"-> 10, "avg" -> 100)
class MyMapAccumulator extends AccumulatorV2[Double, Map[String, Any]] {
  private var map = Map[String, Any]()

  override def isZero: Boolean = map.isEmpty

  override def copy(): AccumulatorV2[Double, Map[String, Any]] = {
    println("copy...")
    val acc = new MyMapAccumulator
    acc.map = map //复制到当前的map
    acc
  }

  /** 重置一下
   * 不可变集合, 直接赋值一个空的集合
   *
   */
  override def reset(): Unit = map = {
    println("reset...")
    Map[String, Any]()
  }

  override def add(v: Double): Unit = {
    // 对sum和count进行累加. avg在最后value函数进行计算

    // 强转成double进行累加.
    map += "sum" -> (map.getOrElse("sum", 0D).asInstanceOf[Double] + v)
    map += "count" -> (map.getOrElse("count", 0L).asInstanceOf[Long] + 1L)
  }

  /**
   * 把所有分区的map进行合并
   *
   * @param other
   */
  override def merge(other: AccumulatorV2[Double, Map[String, Any]]): Unit = {
    // 合并两个map
    other match {
      //是MapAcc才进行合并,如果是其它类型的话就抛异常
      case o: MyMapAccumulator =>
        map +=
          "sum" -> (map.getOrElse("sum", 0D).asInstanceOf[Double]
            + o.map.getOrElse("sum", 0D).asInstanceOf[Double])
        map +=
          "count" -> (map.getOrElse("count", 0L).asInstanceOf[Long]
            + o.map.getOrElse("count", 0L).asInstanceOf[Long])
      case _ => throw new UnsupportedOperationException
    }
  }


  override def value: Map[String, Any] = {
    // 进行平均值统计
    map += "avg" ->
      (map.getOrElse("sum", 0D).asInstanceOf[Double] / map.getOrElse("count", 0L).asInstanceOf[Long])
    map
  }
}

